#!/usr/bin/env python

'''Changing masters for a shard closes open changefeeds on that table'''

# issue-url: https://github.com/rethinkdb/rethinkdb/issues/2757
# issue-description: Changing masters for a shard closes open changefeeds on that table
# issue-discussion: This will need to change the last sections for: #3549 and #3547

import os, sys

sys.path.append(os.path.join(os.path.dirname(__file__), os.path.pardir, 'common'))
import utils, rdb_unittest

# --

class Changefeeds_Master(rdb_unittest.RdbTestCase):
    
    recordsToGenerate = 100
    shards = 2
    replicas = 2
    destructiveTest = True
    
    def test_change_master(self):
    
        utils.print_with_time("Establishing ReQL connections")
        
        shardA, shardB = self.table.config()['shards'][:2].run(self.conn)
        
        serverA1 = [x for x in self.cluster if x.name == shardA['primary_replica']][0]
        serverA2 = [x for x in self.cluster if x.name in shardA['replicas'] and x.name != serverA1.name][0]
        
        serverB1 = [x for x in self.cluster if x.name == shardB['primary_replica']][0]
        serverB2 = [x for x in self.cluster if x.name in shardB['replicas'] and x.name != serverB1.name][0]
        
        connA1 = self.r.connect(host=serverA1.host, port=serverA1.driver_port)
        connA2 = self.r.connect(host=serverA2.host, port=serverA2.driver_port)
        
        connB1 = self.r.connect(host=serverA2.host, port=serverA2.driver_port)
        
        utils.print_with_time("Opening changefeeds")
        
        rangeA, rangeB = utils.getShardRanges(self.conn, self.tableName)[:2]
        
        sampleA_ID = self.table.between(rangeA[0], rangeA[1]).sample(1).nth(0)['id'].run(connA1)
        sampleB_ID = self.table.between(rangeB[0], rangeB[1]).sample(1).nth(0)['id'].run(connB1)
        
        feedA1 = self.table.between(rangeA[0], rangeA[1]).changes().run(connA1)
        feedA2 = self.table.between(rangeA[0], rangeA[1]).changes().run(connA2)
        
        feedB = self.table.between(rangeB[0], rangeB[1]).changes().run(connB1)
        pointFeed = self.table.get(sampleB_ID).changes().run(connB1)
        
        utils.print_with_time("Check that the changefeeds work")
        
        self.table.get(sampleA_ID).update({'update':'a'}).run(self.conn)
        self.table.get(sampleB_ID).update({'update':'a'}).run(self.conn)
        
        
        feedA1.next(wait=.2)
        feedA2.next(wait=.2)
        
        feedB.next(wait=.2)
        pointFeed.next(wait=.2)
        
        utils.print_with_time("Reverse the server assignments")
        
        result = self.table.config().update({'shards':[
            {'primary_replica': serverA2.name, 'replicas':[serverA2.name, serverA1.name]}, # swapped
            {'primary_replica': serverB1.name, 'replicas':[serverB1.name, serverB2.name]}  # unchanged
        ]}).run(self.conn)
        self.assertTrue(result['errors'] == 0, msg='Failed updating table config: %s' % repr(result))
        self.table.wait(timeout=20, wait_for="all_replicas_ready").run(self.conn)
        
        utils.print_with_time("Check that feeds are indeed disconnected on a changed shard")
        
        self.table.get(sampleA_ID).update({'update':'b'}).run(self.conn)

        self.assertRaises(self.r.ReqlRuntimeError, feedA1.next, wait=.2)
        self.assertRaises(self.r.ReqlRuntimeError, feedA2.next, wait=.2)
        
        utils.print_with_time("Check that feeds are also disconnected on an unchanged shard")
        
        self.table.get(sampleB_ID).update({'update':'b'}).run(self.conn)
        
        self.assertRaises(self.r.ReqlRuntimeError, feedB.next, wait=.2)
        self.assertRaises(self.r.ReqlRuntimeError, pointFeed.next, wait=.2)

# ===== main

if __name__ == '__main__':
    rdb_unittest.main()
